/*
 * Copyright 2020 Wuyi Chen.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.kafkadefinitiveguide.producer.serializer.avroserializer;

import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * Send GenericRecord (message) by Avro serializer.
 * 
 * <p>Customer is not Pojo object, it is Avro object generated by avro-tools.jar.
 *
 * @author  Wuyi Chen
 * @date    06/03/2020
 * @version 1.0
 * @since   1.0
 */
public class GenericAvroRecordProducer {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("bootstrap.servers",   "localhost:9092");
		props.put("key.serializer",      "io.confluent.kafka.serializers.KafkaAvroSerializer");
		props.put("value.serializer",    "io.confluent.kafka.serializers.KafkaAvroSerializer");
		props.put("schema.registry.url", "localhost:8081");                            // URL points to the schema registry.

		String schemaString = "{\"namespace\": \"customerManagement.avro\"," +         // Provide the Avro schema
				"\"type\": \"record\", " +
				"\"name\": \"Customer\"," +
				"\"fields\": [" +
				"{\"name\": \"id\", \"type\": \"int\"}," +
				"{\"name\": \"name\", \"type\": \"string\"}," +
				"{\"name\": \"email\", \"type\": " + "[\"string\",\"null\"], " +
				"\"default\":\"null\" }" +
				"]}";
		
		Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);

		Schema.Parser parser = new Schema.Parser();
		Schema schema = parser.parse(schemaString);

		int customers = 10;
		for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
		    String name = "exampleCustomer" + nCustomers;
		    String email = "example " + nCustomers + "@example.com";

		    GenericRecord customer = new GenericData.Record(schema); 
		    customer.put("id", nCustomers);
		    customer.put("name", name);
		    customer.put("email", email);

		    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer);
		    producer.send(data);
		}
	}
}
